home *** CD-ROM | disk | FTP | other *** search
/ Enter 2006 September / Enter 09 2006.iso / Internet / SpamExperts Home 1.1 / SpamExperts Home.exe / lib / spamexperts.modules / ZODB / FileStorage / fspack.pyc (.txt) < prev   
Encoding:
Python Compiled Bytecode  |  2006-07-14  |  15.5 KB  |  576 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. '''FileStorage helper to perform pack.
  5.  
  6. A storage contains an ordered set of object revisions.  When a storage
  7. is packed, object revisions that are not reachable as of the pack time
  8. are deleted.  The notion of reachability is complicated by
  9. backpointers -- object revisions that point to earlier revisions of
  10. the same object.
  11.  
  12. An object revisions is reachable at a certain time if it is reachable
  13. from the revision of the root at that time or if it is reachable from
  14. a backpointer after that time.
  15. '''
  16. import os
  17. from ZODB.serialize import referencesf
  18. from ZODB.utils import p64, u64, z64
  19. from ZODB.fsIndex import fsIndex
  20. from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError, DataHeader, TRANS_HDR_LEN
  21.  
  22. class DataCopier(FileStorageFormatter):
  23.     '''Mixin class for copying transactions into a storage.
  24.  
  25.     The restore() and pack() methods share a need to copy data records
  26.     and update pointers to data in earlier transaction records.  This
  27.     class provides the shared logic.
  28.  
  29.     The mixin extends the FileStorageFormatter with a copy() method.
  30.     It also requires that the concrete class provides the following
  31.     attributes:
  32.  
  33.     _file -- file with earlier destination data
  34.     _tfile -- destination file for copied data
  35.     _pos -- file pos of destination transaction
  36.     _tindex -- maps oid to data record file pos
  37.     _tvindex -- maps version name to data record file pos
  38.  
  39.     _tindex and _tvindex are updated by copy().
  40.  
  41.     The copy() method does not do any locking.
  42.     '''
  43.     
  44.     def _txn_find(self, tid, stop_at_pack):
  45.         pos = self._pos
  46.         while pos > 4:
  47.             self._file.seek(pos - 8)
  48.             pos = pos - u64(self._file.read(8)) - 8
  49.             self._file.seek(pos)
  50.             h = self._file.read(TRANS_HDR_LEN)
  51.             _tid = h[:8]
  52.             if _tid == tid:
  53.                 return pos
  54.             
  55.             if stop_at_pack:
  56.                 if h[16] == 'p':
  57.                     break
  58.                 
  59.             h[16] == 'p'
  60.         raise UndoError(None, 'Invalid transaction id')
  61.  
  62.     
  63.     def _data_find(self, tpos, oid, data):
  64.         h = self._read_txn_header(tpos)
  65.         tend = tpos + h.tlen
  66.         pos = self._file.tell()
  67.         while pos < tend:
  68.             h = self._read_data_header(pos)
  69.             if h.oid == oid:
  70.                 if h.plen == 0:
  71.                     return pos
  72.                 
  73.                 if h.plen != len(data):
  74.                     error('Mismatch between data and backpointer at %d', pos)
  75.                     return 0
  76.                 
  77.                 _data = self._file.read(h.plen)
  78.                 if data != _data:
  79.                     return 0
  80.                 
  81.                 return pos
  82.             
  83.             pos += h.recordlen()
  84.         return 0
  85.  
  86.     
  87.     def _restore_pnv(self, oid, prev, version, bp):
  88.         if not prev:
  89.             return None
  90.         
  91.         pnv = None
  92.         h = self._read_data_header(prev, oid)
  93.         if h.version:
  94.             return h.pnv
  95.         elif bp:
  96.             h2 = self._read_data_header(bp, oid)
  97.             if h2.version:
  98.                 return h2.pnv
  99.             else:
  100.                 warn('restore could not find previous non-version data at %d or %d', prev, bp)
  101.                 return None
  102.         
  103.  
  104.     
  105.     def _resolve_backpointer(self, prev_txn, oid, data):
  106.         prev_pos = 0
  107.         if prev_txn is not None:
  108.             prev_txn_pos = self._txn_find(prev_txn, 0)
  109.             if prev_txn_pos:
  110.                 prev_pos = self._data_find(prev_txn_pos, oid, data)
  111.             
  112.         
  113.         return prev_pos
  114.  
  115.     
  116.     def copy(self, oid, serial, data, version, prev_txn, txnpos, datapos):
  117.         prev_pos = self._resolve_backpointer(prev_txn, oid, data)
  118.         old = self._index.get(oid, 0)
  119.         here = datapos
  120.         self._tindex[oid] = here
  121.         if prev_pos:
  122.             data = None
  123.         
  124.         if data is None:
  125.             dlen = 0
  126.         else:
  127.             dlen = len(data)
  128.         h = DataHeader(oid, serial, old, txnpos, len(version), dlen)
  129.         if version:
  130.             h.version = version
  131.             pnv = self._restore_pnv(oid, old, version, prev_pos)
  132.             if pnv is not None:
  133.                 h.pnv = pnv
  134.             else:
  135.                 h.pnv = old
  136.             h.vprev = self._tvindex.get(version, 0)
  137.             if not h.vprev:
  138.                 h.vprev = self._vindex.get(version, 0)
  139.             
  140.             self._tvindex[version] = here
  141.         
  142.         self._tfile.write(h.asString())
  143.         if data is None:
  144.             if prev_pos:
  145.                 self._tfile.write(p64(prev_pos))
  146.             else:
  147.                 self._tfile.write(z64)
  148.         else:
  149.             self._tfile.write(data)
  150.  
  151.  
  152.  
  153. class GC(FileStorageFormatter):
  154.     
  155.     def __init__(self, file, eof, packtime):
  156.         self._file = file
  157.         self._name = file.name
  158.         self.eof = eof
  159.         self.packtime = packtime
  160.         self.packpos = None
  161.         self.oid2curpos = fsIndex()
  162.         self.oid2verpos = fsIndex()
  163.         self.reachable = fsIndex()
  164.         self.reach_ex = { }
  165.         self.ltid = z64
  166.  
  167.     
  168.     def isReachable(self, oid, pos):
  169.         '''Return 1 if revision of `oid` at `pos` is reachable.'''
  170.         rpos = self.reachable.get(oid)
  171.         if rpos is None:
  172.             return 0
  173.         
  174.         if rpos == pos:
  175.             return 1
  176.         
  177.         return pos in self.reach_ex.get(oid, [])
  178.  
  179.     
  180.     def findReachable(self):
  181.         self.buildPackIndex()
  182.         self.findReachableAtPacktime([
  183.             z64])
  184.         self.findReachableFromFuture()
  185.         del self.oid2verpos
  186.         del self.oid2curpos
  187.  
  188.     
  189.     def buildPackIndex(self):
  190.         pos = 0x4L
  191.         unpacked = False
  192.         while pos < self.eof:
  193.             th = self._read_txn_header(pos)
  194.             if th.tid > self.packtime:
  195.                 break
  196.             
  197.             self.checkTxn(th, pos)
  198.             if th.status != 'p':
  199.                 unpacked = True
  200.             
  201.             tpos = pos
  202.             end = pos + th.tlen
  203.             pos += th.headerlen()
  204.             while pos < end:
  205.                 dh = self._read_data_header(pos)
  206.                 self.checkData(th, tpos, dh, pos)
  207.                 if dh.version:
  208.                     self.oid2verpos[dh.oid] = pos
  209.                 else:
  210.                     self.oid2curpos[dh.oid] = pos
  211.                 pos += dh.recordlen()
  212.             tlen = self._read_num(pos)
  213.             if tlen != th.tlen:
  214.                 self.fail(pos, 'redundant transaction length does not match initial transaction length: %d != %d', tlen, th.tlen)
  215.             
  216.             pos += 8
  217.         self.packpos = pos
  218.         if unpacked:
  219.             return None
  220.         
  221.         
  222.         try:
  223.             th = self._read_txn_header(pos)
  224.         except CorruptedDataError:
  225.             err = None
  226.             if err.buf != '':
  227.                 raise 
  228.             
  229.         except:
  230.             err.buf != ''
  231.  
  232.         if th.status == 'p':
  233.             RedundantPackWarning = RedundantPackWarning
  234.             import ZODB.FileStorage.FileStorage
  235.             raise RedundantPackWarning('The database has already been packed to a later time or no changes have been made since the last pack')
  236.         
  237.  
  238.     
  239.     def findReachableAtPacktime(self, roots):
  240.         '''Mark all objects reachable from the oids in roots as reachable.'''
  241.         todo = list(roots)
  242.         while todo:
  243.             oid = todo.pop()
  244.             if self.reachable.has_key(oid):
  245.                 continue
  246.             
  247.             L = []
  248.             pos = self.oid2curpos.get(oid)
  249.             if pos is not None:
  250.                 L.append(pos)
  251.                 todo.extend(self.findrefs(pos))
  252.             
  253.             pos = self.oid2verpos.get(oid)
  254.             if pos is not None:
  255.                 L.append(pos)
  256.                 todo.extend(self.findrefs(pos))
  257.             
  258.             if not L:
  259.                 continue
  260.             
  261.             pos = L.pop()
  262.             self.reachable[oid] = pos
  263.             if L:
  264.                 self.reach_ex[oid] = L
  265.                 continue
  266.  
  267.     
  268.     def findReachableFromFuture(self):
  269.         extra_roots = []
  270.         pos = self.packpos
  271.         while pos < self.eof:
  272.             th = self._read_txn_header(pos)
  273.             self.checkTxn(th, pos)
  274.             tpos = pos
  275.             end = pos + th.tlen
  276.             pos += th.headerlen()
  277.             while pos < end:
  278.                 dh = self._read_data_header(pos)
  279.                 self.checkData(th, tpos, dh, pos)
  280.                 if dh.back and dh.back < self.packpos:
  281.                     if self.reachable.has_key(dh.oid):
  282.                         L = self.reach_ex.setdefault(dh.oid, [])
  283.                         if dh.back not in L:
  284.                             L.append(dh.back)
  285.                             extra_roots.append(dh.back)
  286.                         
  287.                     else:
  288.                         self.reachable[dh.oid] = dh.back
  289.                 
  290.                 if dh.version and dh.pnv:
  291.                     if self.reachable.has_key(dh.oid):
  292.                         L = self.reach_ex.setdefault(dh.oid, [])
  293.                         if dh.pnv not in L:
  294.                             L.append(dh.pnv)
  295.                             extra_roots.append(dh.pnv)
  296.                         
  297.                     else:
  298.                         self.reachable[dh.oid] = dh.back
  299.                 
  300.                 pos += dh.recordlen()
  301.             tlen = self._read_num(pos)
  302.             if tlen != th.tlen:
  303.                 self.fail(pos, 'redundant transaction length does not match initial transaction length: %d != %d', tlen, th.tlen)
  304.             
  305.             pos += 8
  306.         for pos in extra_roots:
  307.             refs = self.findrefs(pos)
  308.             self.findReachableAtPacktime(refs)
  309.         
  310.  
  311.     
  312.     def findrefs(self, pos):
  313.         '''Return a list of oids referenced as of packtime.'''
  314.         dh = self._read_data_header(pos)
  315.         while dh.back:
  316.             dh = self._read_data_header(dh.back)
  317.         if dh.plen:
  318.             return referencesf(self._file.read(dh.plen))
  319.         else:
  320.             return []
  321.  
  322.  
  323.  
  324. class PackCopier(DataCopier):
  325.     
  326.     def __init__(self, f, index, vindex, tindex, tvindex):
  327.         self._file = f
  328.         self._tfile = f
  329.         self._index = index
  330.         self._vindex = vindex
  331.         self._tindex = tindex
  332.         self._tvindex = tvindex
  333.         self._pos = None
  334.  
  335.     
  336.     def setTxnPos(self, pos):
  337.         self._pos = pos
  338.  
  339.     
  340.     def _resolve_backpointer(self, prev_txn, oid, data):
  341.         pos = self._tfile.tell()
  342.         
  343.         try:
  344.             return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
  345.         finally:
  346.             self._tfile.seek(pos)
  347.  
  348.  
  349.     
  350.     def _restore_pnv(self, oid, prev, version, bp):
  351.         pos = self._tfile.tell()
  352.         
  353.         try:
  354.             return DataCopier._restore_pnv(self, oid, prev, version, bp)
  355.         finally:
  356.             self._tfile.seek(pos)
  357.  
  358.  
  359.  
  360.  
  361. class FileStoragePacker(FileStorageFormatter):
  362.     
  363.     def __init__(self, path, stop, la, lr, cla, clr, current_size):
  364.         self._name = path
  365.         self._file = open(path, 'rb')
  366.         self._path = path
  367.         self._stop = stop
  368.         self.locked = 0
  369.         self.file_end = current_size
  370.         self.gc = GC(self._file, self.file_end, self._stop)
  371.         self._lock_acquire = la
  372.         self._lock_release = lr
  373.         self._commit_lock_acquire = cla
  374.         self._commit_lock_release = clr
  375.         self.index = fsIndex()
  376.         self.vindex = { }
  377.         self.tindex = { }
  378.         self.tvindex = { }
  379.         self.oid2tid = { }
  380.         self.toid2tid = { }
  381.         self.toid2tid_delete = { }
  382.         self.nvindex = fsIndex()
  383.  
  384.     
  385.     def pack(self):
  386.         self.gc.findReachable()
  387.         self._tfile = open(self._name + '.pack', 'w+b')
  388.         self._file.seek(0)
  389.         self._tfile.write(self._file.read(self._metadata_size))
  390.         self._copier = PackCopier(self._tfile, self.index, self.vindex, self.tindex, self.tvindex)
  391.         (ipos, opos) = self.copyToPacktime()
  392.         if not ipos == self.gc.packpos:
  393.             raise AssertionError
  394.         if ipos == opos:
  395.             self._tfile.close()
  396.             self._file.close()
  397.             os.remove(self._name + '.pack')
  398.             return None
  399.         
  400.         self._commit_lock_acquire()
  401.         self.locked = 1
  402.         self._lock_acquire()
  403.         
  404.         try:
  405.             self._file.close()
  406.             self._file = open(self._path, 'rb', 0)
  407.             self._file.seek(0, 2)
  408.             self.file_end = self._file.tell()
  409.         finally:
  410.             self._lock_release()
  411.  
  412.         if ipos < self.file_end:
  413.             self.copyRest(ipos)
  414.         
  415.         pos = self._tfile.tell()
  416.         self._tfile.flush()
  417.         self._tfile.close()
  418.         self._file.close()
  419.         return pos
  420.  
  421.     
  422.     def copyToPacktime(self):
  423.         offset = 0x0L
  424.         pos = self._metadata_size
  425.         new_pos = pos
  426.         while pos < self.gc.packpos:
  427.             th = self._read_txn_header(pos)
  428.             (new_tpos, pos) = self.copyDataRecords(pos, th)
  429.             if new_tpos:
  430.                 new_pos = self._tfile.tell() + 8
  431.                 tlen = new_pos - new_tpos - 8
  432.                 self._tfile.seek(new_tpos + 8)
  433.                 self._tfile.write(p64(tlen))
  434.                 self._tfile.seek(new_pos - 8)
  435.                 self._tfile.write(p64(tlen))
  436.             
  437.             tlen = self._read_num(pos)
  438.             if tlen != th.tlen:
  439.                 self.fail(pos, 'redundant transaction length does not match initial transaction length: %d != %d', tlen, th.tlen)
  440.             
  441.             pos += 8
  442.         return (pos, new_pos)
  443.  
  444.     
  445.     def fetchBackpointer(self, oid, back):
  446.         '''Return data and refs backpointer `back` to object `oid.
  447.  
  448.         If `back` is 0 or ultimately resolves to 0, return None
  449.         and None.  In this case, the transaction undoes the object
  450.         creation.
  451.         '''
  452.         if back == 0:
  453.             return None
  454.         
  455.         (data, tid) = self._loadBackTxn(oid, back, 0)
  456.         return data
  457.  
  458.     
  459.     def copyDataRecords(self, pos, th):
  460.         '''Copy any current data records between pos and tend.
  461.  
  462.         Returns position of txn header in output file and position
  463.         of next record in the input file.
  464.  
  465.         If any data records are copied, also write txn header (th).
  466.         '''
  467.         copy = 0
  468.         new_tpos = 0x0L
  469.         tend = pos + th.tlen
  470.         pos += th.headerlen()
  471.         while pos < tend:
  472.             h = self._read_data_header(pos)
  473.             if not self.gc.isReachable(h.oid, pos):
  474.                 pos += h.recordlen()
  475.                 continue
  476.             
  477.             pos += h.recordlen()
  478.             if not copy:
  479.                 th.status = 'p'
  480.                 s = th.asString()
  481.                 new_tpos = self._tfile.tell()
  482.                 self._tfile.write(s)
  483.                 new_pos = new_tpos + len(s)
  484.                 copy = 1
  485.             
  486.             if h.plen:
  487.                 data = self._file.read(h.plen)
  488.             else:
  489.                 data = self.fetchBackpointer(h.oid, h.back)
  490.             self.writePackedDataRecord(h, data, new_tpos)
  491.             new_pos = self._tfile.tell()
  492.         return (new_tpos, pos)
  493.  
  494.     
  495.     def writePackedDataRecord(self, h, data, new_tpos):
  496.         if data is None:
  497.             data = ''
  498.         
  499.         h.prev = 0
  500.         h.back = 0
  501.         h.plen = len(data)
  502.         h.tloc = new_tpos
  503.         pos = self._tfile.tell()
  504.         if h.version:
  505.             h.pnv = self.index.get(h.oid, 0)
  506.             h.vprev = self.vindex.get(h.version, 0)
  507.             self.vindex[h.version] = pos
  508.         
  509.         self.index[h.oid] = pos
  510.         if h.version:
  511.             self.vindex[h.version] = pos
  512.         
  513.         self._tfile.write(h.asString())
  514.         self._tfile.write(data)
  515.         if not data:
  516.             self._tfile.write(z64)
  517.         
  518.  
  519.     
  520.     def copyRest(self, ipos):
  521.         self._lock_counter = 0
  522.         
  523.         try:
  524.             while None:
  525.                 ipos = self.copyOne(ipos)
  526.         except CorruptedDataError:
  527.             err = None
  528.             self._file.seek(0, 2)
  529.             endpos = self._file.tell()
  530.             if endpos != err.pos:
  531.                 raise 
  532.             
  533.         except:
  534.             endpos != err.pos
  535.  
  536.  
  537.     
  538.     def copyOne(self, ipos):
  539.         th = self._read_txn_header(ipos)
  540.         self._lock_counter += 1
  541.         if self._lock_counter % 20 == 0:
  542.             self._commit_lock_release()
  543.         
  544.         pos = self._tfile.tell()
  545.         self._copier.setTxnPos(pos)
  546.         self._tfile.write(th.asString())
  547.         tend = ipos + th.tlen
  548.         ipos += th.headerlen()
  549.         while ipos < tend:
  550.             h = self._read_data_header(ipos)
  551.             ipos += h.recordlen()
  552.             prev_txn = None
  553.             if h.plen:
  554.                 data = self._file.read(h.plen)
  555.             else:
  556.                 data = self.fetchBackpointer(h.oid, h.back)
  557.                 if h.back:
  558.                     prev_txn = self.getTxnFromData(h.oid, h.back)
  559.                 
  560.             self._copier.copy(h.oid, h.tid, data, h.version, prev_txn, pos, self._tfile.tell())
  561.         tlen = self._tfile.tell() - pos
  562.         if not tlen == th.tlen:
  563.             raise AssertionError
  564.         self._tfile.write(p64(tlen))
  565.         ipos += 8
  566.         self.index.update(self.tindex)
  567.         self.tindex.clear()
  568.         self.vindex.update(self.tvindex)
  569.         self.tvindex.clear()
  570.         if self._lock_counter % 20 == 0:
  571.             self._commit_lock_acquire()
  572.         
  573.         return ipos
  574.  
  575.  
  576.